Lý thuyết của bài
Đã cài đặt trước đó
để bật Web UI, cần đảm bảo đã bật spark trong sbin
$SPARK_HOME/sbin/start-all.sh
sau đó truy cập vào cổng 8080
Cài đặt bằng lệnh
sudo apt update
sudo apt install maven
Kiểm tra
mvn -version
Nếu cài đặt đúng thì sẽ trả về
Apache Maven 3.6.3
Maven home: /usr/share/maven
Java version: 1.8.0_442, vendor: Private Build, runtime: /usr/lib/jvm/java-8-openjdk-amd64/jre
Default locale: en_US, platform encoding: ANSI_X3.4-1968
OS name: "linux", version: "5.15.167.4-microsoft-standard-wsl2", arch: "amd64", family: "unix"
mvn archetype:generate -DgroupId=com.example -DartifactId=SparkApp -DarchetypeArtifactId=maven-archetype-quickstart -DarchetypeVersion=1.4 -DinteractiveMode=false
sau đó sẽ được thư mục có cấu trúc như sau
pom.xml <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.5.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.5.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.5.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.0</version>
</dependency>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>SparkApp</artifactId>
<version>1.0-SNAPSHOT</version>
<name>SparkApp</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.5.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.5.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.5.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.5.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.0</version>
</dependency>
</dependencies>
<build>
<pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
<plugins>
<!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.1</version>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-install-plugin</artifactId>
<version>2.5.2</version>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
</plugin>
<!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
<plugin>
<artifactId>maven-site-plugin</artifactId>
<version>3.7.1</version>
</plugin>
<plugin>
<artifactId>maven-project-info-reports-plugin</artifactId>
<version>3.0.0</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
Tạo input txt và đem vào hdfs
xin chao cac ban xin chao Hadoop Bid Data minh xin tu gioi thieu minh len la Luu Vinh Tuong
package spark.main;
import java.util.Arrays;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
public class WordCount {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setMaster("local[*]")
.setAppName("Spark Word Count");
try (JavaSparkContext sc = new JavaSparkContext(conf)) {
String inputPath = "hdfs://vinhtuong-master:9000/input/input_1.txt";
String outputPath = "hdfs://vinhtuong-master:9000/output/result";
JavaRDD<String> textFile = sc.textFile(inputPath).cache();
JavaPairRDD<String, Integer> wordCounts = textFile
.flatMap(line -> Arrays.asList(line.split("\\s+")).iterator())
.mapToPair(word -> new Tuple2<>(word.replaceAll("[^a-zA-Z]", "").toLowerCase(), 1))
.reduceByKey(Integer::sum);
wordCounts.coalesce(1).saveAsTextFile(outputPath);
System.out.println("Word Count completed. Output saved to " + outputPath);
}
}
}
mvn clean package
Sau khi đóng gói, chúng ta sẽ được thư mục target như sau:
spark-submit --class spark.main.WordCount --master local[*] target/SparkApp-1.0-SNAPSHOT.jar
name,age,city
Alice,30,New York
Bob,25,Los Angeles
Charlie,35,Chicago
David,40,Houston
Emma,22,San Francisco
Frank,28,Seattle
Grace,33,Boston
Henry,27,Denver
package spark.main;
import org.apache.spark.sql.*;
public class SparkSQLExample {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.appName("Spark SQL Example")
.master("local[*]")
.getOrCreate();
Dataset<Row> df = spark.read()
.option("header", "true")
.option("inferSchema", "true")
.csv("hdfs://vinhtuong-master:9000/input/people.csv");
System.out.println("Du lieu ban dau:");
df.show();
System.out.println("Schema:");
df.printSchema();
System.out.println("Chon name va age:");
df.select("name", "age").show();
System.out.println("Loc nhung nguoi tren 25 tuoi:");
df.filter("age > 25").show();
System.out.println("Nhom theo so tuoi va dem so nguoi:");
df.groupBy("age").count().show();
df.write().mode("overwrite").csv("hdfs://vinhtuong-master:9000/output/people");
spark.stop();
}
}
mvn clean package
spark-submit --class spark.main.SparkSQLExample --master local[*] target/SparkApp-1.0-SNAPSHOT.jar
Output bên hdfs sẽ không có gì. Vì những thao tác mình không lưu lại (check trên code) mà chỉ show ra màn hình
Nên sẽ check file thực thi ở mục trên
Lấy dữ liệu tại đây.
package spark.main;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class part_1 {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.appName("Part-1")
.master("local")
.getOrCreate();
Dataset<Row> data = spark.read()
.option("inferSchema", true)
.option("header", true)
.csv("hdfs://vinhtuong-master:9000/input/retails.csv");
// number of customer distinct
// except 1 because there are value is null of information customer ID
long cntCustomers = data.select("CustomerID").distinct().count() - 1;
// number of product distinct
long cntProdcts = data.select("StockCode").distinct().count();
// number of invoice distinct
long cntInvoices = data.select("InvoiceNo").distinct().count();
// print
System.out.println("Number of customer distinct: " + cntCustomers);
System.out.println("Number of product distinct: " + cntProdcts);
System.out.println("Number of invoice distinct: " + cntInvoices);
}
}
mvn clean package
spark-submit --class spark.main.part_1 target/SparkApp-1.0-SNAPSHOT.jar
package spark.main;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class part_2 {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.appName("Part-2")
.master("local")
.getOrCreate();
Dataset<Row> data = spark.read()
.option("inferSchema", true)
.option("header", true)
.csv("hdfs://vinhtuong-master:9000/input/retails.csv");
// get number of customer (done part 1)
long cntCustomers = data.select("CustomerID").count();
// get number of customer no information
long cntCustomersNoInfor = data.select("CustomerID").filter(data.col("CustomerID").isNull()).count();
double ratio = (double) cntCustomersNoInfor / cntCustomers * 100;
System.out.printf("Ratio no information: %f \n", ratio);
}
}
mvn clean package
spark-submit --class spark.main.part_2 target/SparkApp-1.0-SNAPSHOT.jar
Tỉ lệ khoảng 29,93 % khách hàng không có thông tin
package spark.main;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class part_3 {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.appName("Part-2")
.master("local")
.getOrCreate();
Dataset<Row> data = spark.read()
.option("inferSchema", true)
.option("header", true)
.csv("hdfs://vinhtuong-master:9000/input/retails.csv");
data.createOrReplaceTempView("data");
spark.sql("select Country, sum(Quantity) as count from data group by Country order by count desc").show();
}
}
mvn clean package
spark-submit --class spark.main.part_3 target/SparkApp-1.0-SNAPSHOT.jar
output sắp xếp từ cao đến thấp => nhiều thứ 3 là EIRE với count = 142637
package spark.main;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.types.StructType;
public class part_4 {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.appName("Part-4")
.master("local")
.getOrCreate();
Dataset<Row> data = spark.read()
.option("inferSchema", true)
.option("header", true)
.csv("hdfs://vinhtuong-master:9000/input/retails.csv");
data.where("Description is not null").flatMap(new FlatMapFunction<Row, Row>() {
private static final long serialVersionUID = 1L;
private int cnt = 0;
@Override
public Iterator<Row> call(Row r) throws Exception {
List<String> listItem = Arrays.asList(r.getString(2).split(" "));
List<Row> listItemRow = new ArrayList<Row>();
for (String item : listItem) {
listItemRow.add(RowFactory.create(cnt, item, 1));
cnt++;
}
return listItemRow.iterator();
}
}, Encoders.row(new StructType()
.add("number", "integer")
.add("word", "string")
.add("lit", "integer")))
.createOrReplaceTempView("data");
spark.sql("select word, count(lit) as count from data group by word order by count desc").show();
}
}
mvn clean package
spark-submit --class spark.main.part_4 target/SparkApp-1.0-SNAPSHOT.jar
Đổi code thành asc để hiển thị từ thấp đến lớn
spark.sql("select word, count(lit) as count from data group by word order by count asc").show();
Những từ trong bảng là top những xuất hiện ít nhất trong phần description
package spark.main;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class part_5 {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.appName("Part-4")
.master("local")
.getOrCreate();
Dataset<Row> data = spark.read()
.option("inferSchema", true)
.option("header", true)
.csv("hdfs://vinhtuong-master:9000/input/retails.csv");
data.filter(data.col("Country").equalTo("United Kingdom")).createOrReplaceTempView("data");
spark.sql("select Description, sum(Quantity) as count from data group by Description order by count desc").show();
}
}
mvn clean package
spark-submit --class spark.main.part_5 target/SparkApp-1.0-SNAPSHOT.jar
check tên đầy đủ trong csv là WORLD WAR 2 GLIDERS ASSTD DESIGNS
với số lượng 48326
Dùng Spark để lấy dữ liệu streaming từ TCP socket (cổng được set là 9999)
Trên máy sẽ mở cổng 9999 để gõ vào
Note: mở 1 terminal khác để chạy. Xem cái này như 1 cái server
nc -lk 9999
sau đó gõ vào những câu để count, với mỗi câu nhập vào được xem là 1 batch
Để kiểm tra xem đã mở được chưa
netstat -an | grep 9999
Nếu mở được sẽ có trạng thái là LISTEN
tcp 0 0 0.0.0.0:9999 0.0.0.0:* LISTEN
package spark.main;
import org.apache.spark.api.java.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.SparkConf;
import java.util.Arrays;
import scala.Tuple2;
import org.apache.log4j.Logger;
import java.util.Iterator;
public class StreamingWordCount {
public static void main(String[] args) throws Exception {
// Set up Spark configuration and streaming context
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("SocketWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(2));
// Create a DStream that connects to a socket
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
System.out.println("Dang nhan du lieu tu socket...");
lines.foreachRDD(rdd -> {
if (!rdd.isEmpty()) {
System.out.println("Du lieu nhan duoc: ");
rdd.collect().forEach(System.out::println);
}
});
// Process each RDD from the DStream
JavaDStream<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(word -> new Tuple2<>(word, 1))
.reduceByKey((a, b) -> a + b);
// Print the counts to the console
wordCounts.foreachRDD(rdd -> {
if (!rdd.isEmpty()) {
System.out.println("Word Count:");
rdd.collect().forEach(System.out::println);
}
});
// Start the streaming computation
jssc.start();
System.out.println("Streaming Started");
jssc.awaitTermination();
}
}
mvn clean package
spark-submit --class spark.main.StreamingWordCount target/SparkApp-1.0-SNAPSHOT.jar
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(2)); //2 giây
Sau Khi kiểm tra có 2 dòng này thì chương trình đã chạy bình thường. Tiếp theo sẽ kiểm tra có nhận được dữ liệu từ cổng 9999 hay không và có word count được hay không
Ý tưởng của phiên bản 1, nhưng kiểm tra có những từ gì xuất hiện trong toàn bộ quá trình streaming
Cập nhật thêm:
updateStateByKey() để giữ trạng thái cộng dồn số lần xuất hiện của từng từ.jssc.checkpoint("checkpoint"); để Spark có thể duy trì trạng thái lâu dài.updateFunction giúp Spark cập nhật tổng số lần xuất hiện từ trước đến hiện tại.package spark.main;
import org.apache.spark.api.java.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.SparkConf;
import java.util.Arrays;
import scala.Tuple2;
import org.apache.log4j.Logger;
import java.util.Iterator;
import java.util.*;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.Function2;
public class StreamingWordCount {
public static void main(String[] args) throws Exception {
// Set up Spark configuration and streaming context
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("SocketWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(2));
jssc.checkpoint("checkpoint");
// Create a DStream that connects to a socket
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
System.out.println("Dang nhan du lieu tu socket...");
lines.foreachRDD(rdd -> {
if (!rdd.isEmpty()) {
System.out.println("Du lieu nhan duoc: ");
rdd.collect().forEach(System.out::println);
}
});
// Process each RDD from the DStream
JavaDStream<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator())
.map(word -> word.toLowerCase());
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(word -> new Tuple2<>(word, 1))
.reduceByKey(Integer::sum);
Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction = (newValues, state) -> {
int sum = state.orElse(0);
for (int val : newValues) {
sum += val;
}
return Optional.of(sum);
};
JavaPairDStream<String, Integer> cumulativeWordCounts = wordCounts.updateStateByKey(updateFunction);
// Print the counts to the console
cumulativeWordCounts.foreachRDD(rdd -> {
if (!rdd.isEmpty()) {
System.out.println("Word Count:");
rdd.collect().forEach(System.out::println);
}
});
// Start the streaming computation
jssc.start();
System.out.println("Streaming Started");
jssc.awaitTermination();
}
}
mvn clean package
spark-submit --class spark.main.StreamingWordCount target/SparkApp-1.0-SNAPSHOT.jar
package spark.main;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
public class Map {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("Demo")
.setMaster("local[2]");
try (JavaSparkContext sc = new JavaSparkContext(conf)) {
List<Integer> list = Arrays.asList(10,20,30);
JavaRDD<Integer> data = sc.parallelize(list);
data = data.map(new Function<Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1) throws Exception {
return v1 * 2;
}
});
data.collect().forEach(v -> System.out.println(v));
}
}
}
10 20 30 * 2 = 20 40 60
package spark.main;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
public class Filter {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("Demo")
.setMaster("local[2]");
try (JavaSparkContext sc = new JavaSparkContext(conf)) {
List<Integer> list = Arrays.asList(10,11,12,13,14,15);
JavaRDD<Integer> data = sc.parallelize(list);
data = data.filter(new Function<Integer, Boolean>() {
private static final long serialVersionUID = 1L;
@Override
public Boolean call(Integer v1) throws Exception {
if(v1 % 5 == 0) return true;
return false;
}
});
data.collect().forEach(v -> System.out.println(v));
}
}
}
package spark.main;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
public class GroupByKey {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("Demo")
.setMaster("local[2]");
try (JavaSparkContext sc = new JavaSparkContext(conf)) {
List<Tuple2<String, Integer>> list = Arrays.asList(
new Tuple2<String, Integer>("C", 3),
new Tuple2<String, Integer>("A", 1),
new Tuple2<String, Integer>("B", 4),
new Tuple2<String, Integer>("A", 2),
new Tuple2<String, Integer>("B", 5));
JavaPairRDD<String, Integer> data = sc.parallelizePairs(list);
data.groupByKey().collect().forEach(s -> System.out.println(s));
}
}
}
package spark.main;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import scala.Tuple2;
public class ReduceByKey {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("Demo")
.setMaster("local[2]");
try (JavaSparkContext sc = new JavaSparkContext(conf)) {
List<Tuple2<String, Integer>> list = Arrays.asList(
new Tuple2<String, Integer>("C", 3),
new Tuple2<String, Integer>("A", 1),
new Tuple2<String, Integer>("B", 4),
new Tuple2<String, Integer>("A", 2),
new Tuple2<String, Integer>("B", 5));
JavaPairRDD<String, Integer> data = sc.parallelizePairs(list);
data = data.reduceByKey(new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
data.collect().forEach(v -> System.out.println(v));
}
}
}
sudo apt-get update
sudo apt-get install apt-transport-https curl gnupg -yqq
echo "deb https://repo.scala-sbt.org/scalasbt/debian all main" | sudo tee /etc/apt/sources.list.d/sbt.list
echo "deb https://repo.scala-sbt.org/scalasbt/debian /" | sudo tee /etc/apt/sources.list.d/sbt_old.list
curl -sL "https://keyserver.ubuntu.com/pks/lookup?op=get&search=0x2EE0EA64E40A89B84B2DF73499E82A75642AC823" | sudo -H gpg --no-default-keyring --keyring gnupg-ring:/etc/apt/trusted.gpg.d/scalasbt-release.gpg --import
sudo chmod 644 /etc/apt/trusted.gpg.d/scalasbt-release.gpg
sudo apt-get update
sudo apt-get install sbt
sbt --version
sbt clean package
Note:Khi buid lần đầu cần tải nhiều thứ về nên buid lâu với tải 1 nùi về
để tìm kết quả search "top endpoint"
constants.pyPATH="/input/LDA/"
NUM_TOPICS=3
MAX_INTER=10
OUTPUT_PATH="/output/LDA/"
preprocessing.pyimport nltk
from nltk.corpus import stopwords
import re as re
nltk.data.path.append("/home/hadoopvinhtuong/nltk_data")
def preprocessing(rdd):
# pre processing data
reviews = rdd.map(lambda x: x['Content']).filter(lambda x: x is not None)
StopWords = stopwords.words("english")
tokens = reviews.map(lambda document: document.strip().lower())
tokens = tokens.map(lambda document: re.split(" ", document))
tokens = tokens.map(lambda word: [x for x in word if x.isalpha()])
tokens = tokens.map(lambda word: [x for x in word if len(x) > 3])
tokens = tokens.map(lambda word: [x for x in word if x not in StopWords])
return tokens
-File code chính LDA.py
import os
import shutil
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.ml.clustering import LDA
from pyspark.ml.feature import CountVectorizer
from preprocessing import preprocessing
import constants
# init Spark Context
conf = SparkConf().setAppName("Spark ML").setMaster("local[2]")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
# read raw data
data = sqlContext.read.format("csv").options(header='true', inferSchema='true').load(os.path.realpath(constants.PATH))
rdd = data.rdd
# preprocessing data
tokens = preprocessing(rdd)
tokens = tokens.zipWithIndex()
df = sqlContext.createDataFrame(tokens, ["content", "index"])
# vector data
cv = CountVectorizer(inputCol="content", outputCol="features", vocabSize=500, minDF=3.0)
cvModel = cv.fit(df)
vectorizedToken = cvModel.transform(df)
# clustering
lda = LDA(k=constants.NUM_TOPICS, maxIter=constants.MAX_INTER)
model = lda.fit(vectorizedToken)
# get vocab
vocab = cvModel.vocabulary
topics = model.describeTopics()
topicsRdd = topics.rdd
# result
result = model.transform(vectorizedToken)
result.show()
# save model
if(os.path.isdir(constants.OUTPUT_PATH + "/Model_CountVectorizer")):
shutil.rmtree(constants.OUTPUT_PATH + "/Model_CountVectorizer")
cvModel.save(constants.OUTPUT_PATH + "/Model_CountVectorizer")
if(os.path.isdir(constants.OUTPUT_PATH + "/Model_LDA")):
shutil.rmtree(constants.OUTPUT_PATH + "/Model_LDA")
model.save(constants.OUTPUT_PATH + "/Model_LDA")
spark-submit LDA.py
Model_CountVectorizer/data
Model_CountVectorizer/metadata
Model_LDA/data
Model_LDA/metadata
=> Đã lưu model vào hdfs
tạo file mới với tên app.py
from pyspark.sql import SparkSession
from flask import Flask, request
from pyspark.ml.clustering import LocalLDAModel
from pyspark.ml.feature import CountVectorizerModel
import constants
from preprocessing import preprocessing
spark = SparkSession.builder \
.appName("Spark MLlib with Flask") \
.master("local[2]") \
.getOrCreate()
# Khởi tạo Flask app
app = Flask(__name__)
@app.route("/api/predict")
def predict():
document = request.args.get("document")
# Tải mô hình CountVectorizer và LDA đã lưu
countVectorizerModel = CountVectorizerModel.load(constants.OUTPUT_PATH + "/Model_CountVectorizer")
ldaModel = LocalLDAModel.load(constants.OUTPUT_PATH + "/Model_LDA")
# Tạo DataFrame từ văn bản đầu vào
documentDF = spark.createDataFrame([(document, )], ["Content"])
rdd = documentDF.rdd
tokens = preprocessing(rdd)
tokens = tokens.zipWithIndex()
df = spark.createDataFrame(tokens, ["content", "index"])
vectorizedToken = countVectorizerModel.transform(df)
# Dự đoán chủ đề bằng mô hình LDA
result = ldaModel.transform(vectorizedToken)
result = result.select("topicDistribution")
result.show(truncate=False)
pred = result.rdd.first()
return {"predict": find_max_index(pred['topicDistribution'])}
@app.route("/")
def home():
return "Flask app is running!"
def find_max_index(arr):
index = 0
max_val = 0
for i in range(len(arr)):
if arr[i] > max_val:
max_val = arr[i]
index = i
return index
if __name__ == "__main__":
app.run(debug=True)
python3 app.py
http://localhost:5000/api/predict?document=Hôm nay tôi đi Hà Nội để gập các cán bộbộ hello xin chào các anh để các em có thể ra ngoài